//package com.xdl.apitest.tabletest
//
//import com.xdl.apitest.SensorReading
//import org.apache.flink.streaming.api.scala._
//import org.apache.flink.table.api.{DataTypes, Table}
//import org.apache.flink.table.api.scala._
//import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
//
///**
// * Project: FlinkTutorial
// * Package: com.xdl.apitest
// * Version: 1.0
// *
// * Created by guoxiaolong on 2020-08-01-16:21
// */
//object KafkaPipelineTest {
//  def main(args: Array[String]): Unit = {
//
//    //0.创建流执行环境，读取数据并转换成样例类
//    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
//
//    // 1.创建表执行环境
//    val tableEnv = StreamTableEnvironment.create(env)
//
//    //2.定义到Kafka的连接、输入表
//    tableEnv.connect(new kafka()
//      .version("0.11") //定义版本
//      .topic("sensor") //定义主题
//      .property("zookeeper.connect", "localhost:2181")
//      .property("bootstrap.servers", "localhost:9092")
//    )
//      .withFormat(new Csv())
//      .withScahema(new Schema()
//        .field("id", DataTypes.STRING())
//        .field("timestamp", DataTypes.BIGINT())
//        .field("temperature", DataTypes.DOUBLE())
//      )
//      .createTemporaryTable("KafkaInputTable") //在表环境注册一张表
//
//
//    //3. 表的查询转换
//    val sensorTable: Table = tableEnv.from("inputTable")
//    //3.1 简单查询转换
//    val resultTable: Table = sensorTable
//      .select("id, temperature")
//      .filter('id === "snesor_1")
//
//    //3.2 聚合转换
//    val aggResultTable: Table = sensorTable
//      .groupBy('id)
//      .select('id, 'id.count as 'count)
//
//    tableEnv.connect(new kafka()
//      .version("0.11") //定义版本
//      .topic("sinkTest") //定义主题
//      .property("zookeeper.connect", "localhost:2181")
//      .property("bootstrap.servers", "localhost:9092")
//    )
//      .withFormat(new Csv())
//      .withScahema(new Schema()
//        .field("id", DataTypes.STRING())
//        .field("temp", DataTypes.DOUBLE())
//      )
//      .createTemporaryTable("KafkaOutputTable")
//
//    resultTable.insertInto("KafkaOutputTable")
//
//    env.execute("Kafka pipeline test job")
//  }
//}
